package org.budo.warehouse.logic.producer.canal;

import org.budo.canal.message.handler.RowChangeUtil;
import org.budo.warehouse.logic.api.DataEntry;

import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

/**
 * @author limingwei
 */
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
public class CanalDataEntry implements DataEntry {
    private Entry entry;

    private RowChange _rowChange;

    public CanalDataEntry(Entry entry) {
        this.setEntry(entry);
    }

    @Override
    public String getEventType() {
        return this.entry.getHeader().getEventType().name();
    }

    @Override
    public String getSql() {
        return this._rowChange().getSql();
    }

    @Override
    public String getSchemaName() {
        return this.entry.getHeader().getSchemaName();
    }

    @Override
    public String getTableName() {
        return this.entry.getHeader().getTableName();
    }

    @Override
    public Long getExecuteTime() {
        return this.entry.getHeader().getExecuteTime();
    }

    @Override
    public Integer getRowCount() {
        return this._rowChange().getRowDatasCount();
    }

    @Override
    public Integer getColumnCount(Integer rowIndex) {
        RowData rowData = this._rowChange().getRowDatas(rowIndex);

        if (EventType.DELETE.equalsIgnoreCase(this.getEventType())) {
            return rowData.getBeforeColumnsCount();
        }

        return rowData.getAfterColumnsCount();
    }

    @Override
    public Boolean getColumnIsKey(Integer rowIndex, Integer columnIndex) {
        RowData rowData = this._rowChange().getRowDatas(rowIndex);

        if (EventType.DELETE.equalsIgnoreCase(this.getEventType())) {
            return rowData.getBeforeColumns(columnIndex).getIsKey();
        }

        return rowData.getAfterColumns(columnIndex).getIsKey();
    }

    @Override
    public String getColumnName(Integer rowIndex, Integer columnIndex) {
        RowData rowData = this._rowChange().getRowDatas(rowIndex);

        if ("delete".equalsIgnoreCase(this.getEventType())) {
            return rowData.getBeforeColumns(columnIndex).getName();
        }

        return rowData.getAfterColumns(columnIndex).getName();
    }

    @Override
    public String getColumnValueAfter(Integer rowIndex, Integer columnIndex) {
        RowData rowData = this._rowChange().getRowDatas(rowIndex);
        Integer afterColumnsCount = rowData.getAfterColumnsCount();
        if (columnIndex >= afterColumnsCount) {
            return null;
        }

        Column afterColumn = rowData.getAfterColumns(columnIndex);
        if (afterColumn.getIsNull()) {
            return null;
        }

        return afterColumn.getValue();
    }

    @Override
    public String getColumnValueBefore(Integer rowIndex, Integer columnIndex) {
        RowData rowData = this._rowChange().getRowDatas(rowIndex);
        Integer beforeColumnsCount = rowData.getBeforeColumnsCount();
        if (columnIndex >= beforeColumnsCount) {
            return null;
        }

        Column beforeColumn = rowData.getBeforeColumns(columnIndex);
        if (beforeColumn.getIsNull()) {
            return null;
        }

        return beforeColumn.getValue();
    }

    private RowChange _rowChange() {
        if (null != this._rowChange) {
            return this._rowChange;
        }

        return this._rowChange = RowChangeUtil.rowChangeParseFromByteString(this.getEntry().getStoreValue());
    }

    @Override
    public String toString() {
        return super.toString() + ", " + "#45 entry.header=" + entry.getHeader() //
                + ", rowChange=" + RowChangeUtil.rowChangeToString(this._rowChange());
    }
}